package org.dragonnova.business.web;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.servlet.http.HttpSession;

import org.dragonnova.business.common.Constants;
import org.dragonnova.business.dto.TagValueDto;
import org.dragonnova.business.model.DeviceTag;
import org.dragonnova.business.service.TagService;
import org.dragonnova.business.util.IOTBeanUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;

import com.alibaba.druid.util.StringUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.TypeReference;
import com.base.pub.websocket.SimpleWebSocketHandler;

public class TagUpdateWebSocketHandler extends SimpleWebSocketHandler {

	private final static Logger logger = LoggerFactory
			.getLogger(TagUpdateWebSocketHandler.class);

	@Autowired
	private WebSocketSessionManagerAdapter socketManager;

	@Autowired
	private TagService tagService;

	private ScheduledExecutorService executor = null;

	@Override
	public void afterConnectionEstablished(WebSocketSession session)
			throws Exception {
		super.initSession(session);

		Map parameters = (Map) JSON.parse((String) session.getAttributes().get(
				"parameters"));
		session.getAttributes().remove("parameters");
		String searchText = ((JSONArray) parameters.get("searchText"))
				.getString(0);
		String indexPage = ((JSONArray) parameters.get("indexPage"))
				.getString(0);
		String orderType = ((JSONArray) parameters.get("orderType"))
				.getString(0);
		String pageSize = ((JSONArray) parameters.get("pageSize")).getString(0);
		session.getAttributes().put("indexPage", Integer.valueOf(indexPage));
		session.getAttributes().put("orderType", Integer.valueOf(orderType));
		session.getAttributes().put("pageSize", Integer.valueOf(pageSize));
		session.getAttributes().put("searchText", searchText);

		if (executor == null) {
			executor = Executors.newScheduledThreadPool(3);
			executor.scheduleAtFixedRate(new TagValueTimerTask(), 3000, 10000,
					TimeUnit.MILLISECONDS);
		}
		if (executor.isShutdown()) {
			logger.debug("当前线程池状态 ：shutdown");
		}
		if (executor.isTerminated()) {
			logger.debug("当前线程池状态 ：Terminated");
		}
		if (logger.isDebugEnabled()) {
			logger.debug("新的客户端接入，当前客户端数量：" + socketManager.getClientCount());
		}
	}

	@Override
	protected void handleTextMessage(WebSocketSession session,
			TextMessage message) throws Exception {
		HttpSession httpSession = (HttpSession) session.getAttributes().get(
				"httpsession");
		final int userId = WebSocketSessionManager.getUserId(httpSession);
		Map data = (Map) JSON.parse(message.getPayload());
		Integer indexPage = null;
		Integer orderType = null;
		Integer pageSize = null;
		String searchText = null;
		if (data.containsKey("indexPage")) {
			indexPage = (Integer) data.get("indexPage");
			session.getAttributes().put("indexPage", indexPage);
		}
		if (data.containsKey("searchText")) {
			searchText = (String) data.get("searchText");
			session.getAttributes().put("searchText", searchText);
		}
		if (data.containsKey("orderType")) {
			orderType = (Integer) data.get("orderType");
			session.getAttributes().put("orderType", orderType);
		}
		if (data.containsKey("pageSize")) {
			pageSize = (Integer) data.get("pageSize");
			session.getAttributes().put("pageSize", pageSize);
		}

	}

	@Override
	public void handleTransportError(WebSocketSession session, Throwable thrwbl)
			throws Exception {
		if (logger.isDebugEnabled()) {
			logger.debug("websocket  transportError : " + thrwbl.getMessage());
		}
	}

	@Override
	public void afterConnectionClosed(WebSocketSession session, CloseStatus cs)
			throws Exception {
		socketManager.removeSession(session);
		System.out
				.println("有客户端断开连接，当前客户端数量：" + socketManager.getClientCount());
		if (logger.isDebugEnabled()) {
			logger.debug("websocket connection closed...... " + cs.getCode());
		}
	}

	@Override
	public boolean supportsPartialMessages() {
		return false;
	}

	public List<DeviceTag> findListerPageData(Integer userId,
			WebSocketSession session) {
		Integer indexPage = (Integer) session.getAttributes().get("indexPage");
		Integer orderType = (Integer) session.getAttributes().get("orderType");
		Integer pageSize = (Integer) session.getAttributes().get("pageSize");
		String searchText = (String) session.getAttributes().get("searchText");
		if (indexPage == null || pageSize == null || indexPage <= 0
				|| pageSize <= 0) {
			return null;
		}
		if (logger.isDebugEnabled()) {
			logger.debug("Page condition  indexPage:" + indexPage
					+ " orderType:" + orderType + " pageSize:" + pageSize
					+ " searchText:" + searchText);
		}
		List<DeviceTag> list = null;
		try {
			list = tagService.findTagByPage(userId, indexPage, pageSize,
					orderType, searchText).getList();
			return list;
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}

	class TagValueTimerTask implements Runnable {

		@Override
		public void run() {
			send();
		}

		public void send() {
			for (Entry<Integer, ConcurrentWebSocketSessionDecorator> item : socketManager
					.getWebSockets()) {
				ConcurrentWebSocketSessionDecorator session = item.getValue();
				List<DeviceTag> list = findListerPageData(-10, session);
				if (list == null)
					continue;
				Map<String, TagValueDto> oldData = JSON.parseObject(
						(String) session.getAttributes().get("pageDto"),
						new TypeReference<Map<String, TagValueDto>>() {
						});
				Map<String, TagValueDto> newData = formatData(list);
				session.getAttributes().put("pageDto",
						JSON.toJSONString(newData));
				String json = getUpdateTag(newData, oldData);
				if (!StringUtils.isEmpty(json)) {
					try {
						if (session.isOpen()) {
							session.sendMessage(new TextMessage(json));
						}
					} catch (IOException e) {
						e.printStackTrace();
						if (logger.isDebugEnabled()) {
							logger.debug("用户 WebSocket异常   userId:"
									+ item.getKey());
						}
					}
				}
			}
		}
	}

	private String getUpdateTag(Map<String, TagValueDto> newData,
			Map<String, TagValueDto> oldData) {
		Map<String, TagValueDto> result = new HashMap<>();
		if (oldData == null) {
			result = newData;
		} else {
			for (Entry<String, TagValueDto> item : newData.entrySet()) {
				if (oldData.containsKey(item.getKey())) {
					if (!compare(item.getValue(), oldData.get(item.getKey()))) {
						result.put(item.getKey(), item.getValue());
					}
				} else {
					result.put(item.getKey(), item.getValue());
				}
			}
		}
		if (result.size() == 0) {
			return null;
		}
		return WebSocketSessionManager.packagingData(
				Constants.RESULT_NORMAL_INFO, "", result);
	}

	private boolean compare(TagValueDto d1, TagValueDto d2) {
		if (Math.abs(d1.getTemp() - d2.getTemp()) > 0) {
			return false;
		}
		if (Math.abs(d1.getHum() - d2.getHum()) > 0) {
			return false;
		}
		return true;
	}

	private Map<String, TagValueDto> formatData(List<DeviceTag> list) {
		Map<String, TagValueDto> result = new HashMap<>();
		for (DeviceTag tag : list) {
			if (tag.getTagValue() != null) {
				TagValueDto valueDto = new TagValueDto();
				IOTBeanUtils.copyPropertiesIgnoreNull(tag.getTagValue(),
						valueDto);
				result.put(tag.getId(), valueDto);
			} else {
				result.put(tag.getId(), new TagValueDto());
			}
		}
		return result;
	}

}
